-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: rework pubsub source to support parallel read and at-least-once #16733
Conversation
Signed-off-by: xxchan <[email protected]>
- (message) change the offset column from ts to ack_id - (split) don't update start_offset - (reader) don't ack on read - refactor WaitCheckpointWorker to support update_task_on_chunk Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the pr generally LGTM
please update in the pr's note that pubsub does not rely on the SplitImpl
's offset to start from a position. It prefers letting the broker side deal with offset, which is different from other connectors.
btw, idk if reusing offset
in the SplitImpl
is a good practice, it may be confusing if users want to query from the state table.
Sure. BTW, I've mentioned that it will be stateless from the RisingWave side.
I don't understand your point. What do you mean by reusing Anyway, I don't think any average user care about query from state table. |
/// pubsub reader. | ||
pub(crate) start_offset: Option<String>, | ||
|
||
/// `stop_offset` is a numeric timestamp. | ||
/// When not `None`, the `PubsubReader` stops reading messages when the `offset` property of | ||
/// the `SourceMessage` is greater than or equal to the `stop_offset`. | ||
pub(crate) stop_offset: Option<String>, | ||
#[serde(rename = "stop_offset")] | ||
#[serde(skip_serializing)] | ||
pub(crate) __deprecated_stop_offset: Option<String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess start_offset
can also be deprecated.
stop_offset
is actually never used.
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM to WaitCheckpointWorker
…limit: 524288 bytes. Signed-off-by: xxchan <[email protected]>
Signed-off-by: xxchan [email protected]I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
close #16572
Behavior changes
ts
toack_id
seek
)WaitCheckpointWorker
to supportupdate_task_on_chunk
: store allack_ids
before next checkpointpubsub.parallelism
WITH option. (Previously calledsplit_count
, but I likeparallelism
better, because "split" is not a user-facing concept feat: pre-release pubsub #14531)SourceEnumerator
enumContext.currentParallelism()
. We don't have such a context in source manager yet.retain_acked_messages
.Note: although there are huge changes, I think all are none breaking.
Explanations
Previously, we ack immediately and rely on
retain_acked_messages
+seek
to timestamp to replay messages. This is awkward, and not like designed usage.Now we ack on checkpoint and don't
seek
at all. It's stateless on RisingWave's side. This corresponds more the queueing semantics (delete after ack).WaitCheckpointWorker
Testing
checkpoint_frequency
to very large (andset visibility_mode TO 'all'
). Create Table and check load successfully. Kill and restart RisingWave, check the data can still be loaded without missing data (probably duplicated).Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
Add
pubsub.parallelism
WITH
option for PubSub source, which specifies the number of parallel consumers to run for the subscription. If not specified, the parallelism will be one.